-
Notifications
You must be signed in to change notification settings - Fork 202
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added SQS interactions for S3 source #1431
Added SQS interactions for S3 source #1431
Conversation
@@ -13,10 +13,13 @@ repositories { | |||
|
|||
dependencies { | |||
implementation project(':data-prepper-api') | |||
implementation 'com.fasterxml.jackson.core:jackson-databind' | |||
implementation project(':data-prepper-plugins:blocking-buffer') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did you add blocking-buffer
here? Data Prepper API provides the Buffer
interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added this for testing S3Source using buffer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For testing could this import be added as testImplementation project(':data-prepper-plugins:blocking-buffer')
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sbayer55 's suggestion is the right approach for a testing dependency.
messages.addAll(sqsClient.receiveMessage(receiveMessageRequest).messages()); | ||
} catch (SqsException e) { | ||
LOG.error(e.awsErrorDetails().errorMessage()); | ||
System.exit(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will exit all of Data Prepper. That is a far too severe consequence of an error communicating with SQS. I think that we want to log the exception, then wait some time, then try again. It could be appropriate to perform an exponential backoff.
} while (true); | ||
} | ||
|
||
private S3EventNotification.S3EventNotificationRecord getS3EventMessages(Message message) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend renaming this so that it has a different verb than "get." Perhaps "convert" or "transform?" Get makes it seem that you are using a getter.
public class SqsOptions { | ||
private final int DEFAULT_MAXIMUM_MESSAGES = 10; | ||
private final int DEFAULT_VISIBILITY_TIMEOUT_SECONDS = 30; | ||
private final int DEFAULT_WAIT_TIME_SECONDS = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this default should be 20 seconds (the current maximum).
import java.time.Duration; | ||
|
||
public class SqsOptions { | ||
private final int DEFAULT_MAXIMUM_MESSAGES = 10; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These should be private static final
(They are missing the static
).
|
||
public class SqsOptions { | ||
private final int DEFAULT_MAXIMUM_MESSAGES = 10; | ||
private final int DEFAULT_VISIBILITY_TIMEOUT_SECONDS = 30; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your choice here: You could make these defaults actual Duration
objects. Since they will be static, you won't need to re-create the Duration
in each field. I also think it is cleaner since the code only deals with Duration
.
e.g.
private static final Duration DEFAULT_VISIBILITY_TIMEOUT = Duration.ofSeconds(20);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this suggestion.
private final S3Client s3Client; | ||
|
||
private AwsCredentialsProvider createCredentialsProvider() { | ||
public AwsCredentialsProvider createCredentialsProvider() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this public and not private?
|
||
for(int i = 0; i < s3SourceConfig.getSqsOptions().getThreadCount(); i++) { | ||
Thread sqsWorkerThread = new Thread(new SqsWorker(sqsClient, s3Client, s3SourceConfig)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we will want to have a List<Thread>
as a field. You can keep all the threads there. You will need to stop them when the stop()
method for S3Source is called.
|
||
messages.addAll(sqsClient.receiveMessage(receiveMessageRequest).messages()); | ||
} catch (SqsException e) { | ||
LOG.error(e.awsErrorDetails().errorMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general you should avoid logging just an error message from some external source. It is better to have:
LOG.error("Error reading from SQS: {}", e.awsErrorDetails().errorMessage());
Relatedly - is awsErrorDetails()
guaranteed to be non-null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if awsErrorDetails()
will be non-null. I'll check this one
try { | ||
Thread.sleep(s3SourceConfig.getSqsOptions().getPollDelay().toMillis()); | ||
} catch (InterruptedException e) { | ||
e.printStackTrace(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should log this, not call e.printStackTrace()
.
3816ace
to
f65d28f
Compare
As a general comment S3Source and SqsWorker have low test coverage. Are you planning to add functionality & test coverage to these classes in a future PR? |
@@ -13,10 +13,13 @@ repositories { | |||
|
|||
dependencies { | |||
implementation project(':data-prepper-api') | |||
implementation 'com.fasterxml.jackson.core:jackson-databind' | |||
implementation project(':data-prepper-plugins:blocking-buffer') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For testing could this import be added as testImplementation project(':data-prepper-plugins:blocking-buffer')
?
|
||
import java.util.Random; | ||
|
||
public class BackoffUtils { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In place of utils could BackoffUtils
class have a more descriptive name? Consider something like RetryState
or BackoffManager
. Without looking at the class file I expected BackoffUtils to contain some helper functions for parsing or transforming data.
|
||
if (shouldRetry()) { | ||
waitUntilNextTry(); | ||
timeToWait += random.nextInt(1000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It there a benefit to a random wait opposed to a fixed time or exponential growth time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it's to break the synchronization across the clients thereby avoiding collisions
|
||
public S3Client createS3Client(final StsClient stsClient) { | ||
|
||
return software.amazon.awssdk.services.s3.S3Client.builder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can the fully qualified name be shortened from software.amazon.awssdk.services.s3.S3Client
to S3Client
? It looks like you already have the import.
|
||
import java.util.Random; | ||
|
||
public class BackoffUtils { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we opting to build our own exponential back off strategy? Is there a reason we cannot use the built-in AWS SDK retry functionality?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do need a different mechanism in place for the entire thread. As the SQS processing runs in its own thread, it must not end until the the source is shutdown.
I think we should use both:
- AWS SDK retry to help with transient errors
- Catch exceptions in the thread, suppress, and wait. This goes beyond AWS SDK errors. Say, some message in the SQS queue is not actually JSON. We don't want the thread to fail for a single bad message.
That being said, perhaps this will not need any sophisticated back-off. Perhaps a straightforward Thread.sleep()
is enough.
} | ||
|
||
@Override | ||
public void run() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is getting very large in this partial state. I would encourage you to break it apart to improve maintainability.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will also make it easier to unit test and can remove variable reassignment/mutation in the code below.
@@ -13,10 +13,13 @@ repositories { | |||
|
|||
dependencies { | |||
implementation project(':data-prepper-api') | |||
implementation 'com.fasterxml.jackson.core:jackson-databind' | |||
implementation project(':data-prepper-plugins:blocking-buffer') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sbayer55 's suggestion is the right approach for a testing dependency.
|
||
import java.util.Random; | ||
|
||
public class BackoffUtils { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do need a different mechanism in place for the entire thread. As the SQS processing runs in its own thread, it must not end until the the source is shutdown.
I think we should use both:
- AWS SDK retry to help with transient errors
- Catch exceptions in the thread, suppress, and wait. This goes beyond AWS SDK errors. Say, some message in the SQS queue is not actually JSON. We don't want the thread to fail for a single bad message.
That being said, perhaps this will not need any sophisticated back-off. Perhaps a straightforward Thread.sleep()
is enough.
.region(Region.of(s3SourceConfig.getAWSAuthentication().getAwsRegion())) | ||
.credentialsProvider(awsCredentialsProvider) | ||
.build(); | ||
Thread.currentThread().interrupt(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Data Prepper core is calling the stop()
method. Thus, this current code is going to interrupt the Data Prepper core thread.
Also, rather than use interrupt()
, I'd like to have a running
/shutdown
boolean flag in the worker thread. You then can use while(running) { ... }
instead of while(true) { ... }
.
f65d28f
to
234bcdb
Compare
Codecov Report
@@ Coverage Diff @@
## main #1431 +/- ##
=========================================
Coverage 94.25% 94.25%
Complexity 1179 1179
=========================================
Files 165 165
Lines 3377 3377
Branches 276 276
=========================================
Hits 3183 3183
Misses 138 138
Partials 56 56 Continue to review full report at Codecov.
|
assertThrows(IllegalStateException.class, () -> s3Source.start(testBuffer)); | ||
} | ||
|
||
private BlockingBuffer<Record<Event>> getBuffer() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason you can't use a mock here?
There may be value in creating some integration tests in other PRs which use the blocking buffer. But for a unit test, we should use mocks if at all possible.
.build(); | ||
} | ||
|
||
private S3EventNotification.S3EventNotificationRecord convertS3EventMessages(final Message message) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we may want to make this part a separate class which is injected. One reason is that the SQS message may actually be in a different format in some cases. If the SQS queue is connected to SNS, then this becomes a string rather than JSON. So we might need to make this configurable by pipeline authors. I'm fine to keep it here for now if that helps merge it in though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can move this to a separate class once we decide on how pipeline authors can configure this.
// build s3ObjectPointer from S3EventNotificationRecord if event name starts with ObjectCreated | ||
List<S3ObjectReference> addedObjects = new ArrayList<>(); | ||
for (Map.Entry<Message, S3EventNotification.S3EventNotificationRecord> entry: s3EventNotificationRecords.entrySet()) { | ||
if (entry.getValue().getEventName().startsWith("ObjectCreated")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to create an S3EventFilter
structure. There are couple motivations I have in this:
- We are also going to add an optional user-configuration to support filtering by bucketName or accountId.
- If we add SNS as an option in future iterations, this filter should be the same code.
Here is a possible design:
public interface S3EventFilter {
Optional<S3EventNotification.S3EventNotificationRecord> filter(S3EventNotification.S3EventNotificationRecord notification)
}
class ObjectCreatedFilter implements S3EventFilter {
Optional<S3EventNotification.S3EventNotificationRecord> filter(S3EventNotification.S3EventNotificationRecord entry) {
if (entry.getValue().getEventName().startsWith("ObjectCreated"))
return Optional.of(entry);
else
return Optional.empty();
}
}
We can build more complicated filters for the bucketName and accountId in another PR.
Signed-off-by: Asif Sohail Mohammed <nsifmoh@amazon.com>
Signed-off-by: Asif Sohail Mohammed <nsifmoh@amazon.com>
Signed-off-by: Asif Sohail Mohammed <nsifmoh@amazon.com>
Signed-off-by: Asif Sohail Mohammed <nsifmoh@amazon.com>
Signed-off-by: Asif Sohail Mohammed <nsifmoh@amazon.com>
Signed-off-by: Asif Sohail Mohammed <nsifmoh@amazon.com>
Signed-off-by: Asif Sohail Mohammed <nsifmoh@amazon.com>
Signed-off-by: Asif Sohail Mohammed <nsifmoh@amazon.com>
Signed-off-by: Asif Sohail Mohammed <nsifmoh@amazon.com>
|
||
import java.util.Objects; | ||
|
||
public class S3ObjectReference { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You might get a merge conflict on this file since I also added this into main
. Please use the one I provided as it has toString()
and is unit tested.
Signed-off-by: Asif Sohail Mohammed <nsifmoh@amazon.com>
5a71690
to
093a9a3
Compare
} | ||
|
||
@Override | ||
public void start(Buffer<Record<Event>> buffer) { | ||
if (buffer == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The buffer is never used the by the S3Service or the SqsService. Who is going to be responsible for putting the item in the buffer (S3Service or SqsService)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The S3ObjectWorker
handles this:
Line 30 in be35437
private final Buffer<Record<Event>> buffer; |
Also, @asifsmohammed and I are coordinating on the work here. He created this PR before the S3ObjectWorker
was present in main
, so some portions like this are currently unused in his branch and work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll update the logic with buffer here
import static org.mockito.Mockito.mock; | ||
import static org.mockito.Mockito.when; | ||
|
||
class SqsWorkerTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The run
method is not tested. We are missing on out some core test coverage should add test cases.
} | ||
|
||
@Override | ||
public void run() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will also make it easier to unit test and can remove variable reassignment/mutation in the code below.
private static final Logger LOG = LoggerFactory.getLogger(SqsWorker.class); | ||
private static final ObjectMapper objectMapper = new ObjectMapper(); | ||
|
||
private S3SourceConfig s3SourceConfig; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These private variables should be final to prevent re-assignment
S3ObjectReference addS3Object(final S3ObjectReference s3ObjectReference) { | ||
// TODO: should return message id and receipt handle if successfully converted to event | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a few concerns over this method and subsequent workflow. I have some ideas for improvements but I want to understand a few things first.
What are we adding an S3Object to? I find this method name confusing and was really confused by the usage in the run()
method.
Why are the parameters and return object the same object (S3ObjectReference)? Are we extending S3ObjectReference and expecting this class to mutate the variable with a messageId and Receipt? (I want to see if we can avoid mutation and nullable variables)
How is the receipt used by the calling function? Do we need it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are the parameters and return object the same object (S3ObjectReference)? Are we extending S3ObjectReference and expecting this class to mutate the variable with a messageId and Receipt? (I want to see if we can avoid mutation and nullable variables)
@asifsmohammed and I discussed a couple approaches here:
- We can make this templated and add something like
T getNotificationMetadata()
which in this case returns SQS metadata. It could be extended for SNS metadata if that were ever added. - We could extend this class with a sub-class.
In either case, the SQS metadata would include the ReceiptHandle
and MessageId
.
I'm fine with either approach. But, the inheritance approach would have the benefit of completely abstracting this metadata from the S3 code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My initial thought was to add ReceiptHandle
and MessageId
to S3ObjectReference
which will be used to get S3 Object.
If S3 object is successfully processed we return the same S3ObjectReference
to delete message from queue.
I'll create a subclass which contains ReceiptHandle
and MessageId
.
Signed-off-by: Asif Sohail Mohammed <nsifmoh@amazon.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for making those changes. Let's address the remaining in the next PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the consistent updates @asifsmohammed !
* Added sqs configuration and basic sqs interactions Signed-off-by: Asif Sohail Mohammed <nsifmoh@amazon.com> Signed-off-by: Finn Roblin <finnrobl@amazon.com>
Description
getMessage
from SQSS3EventNotificationRecord
Used AWS SDK v1 for Deserializing SQS message into
S3EventNotificationRecord
since it's not yet ported to v2. For more info see: aws/aws-sdk-java-v2#1197Issues Resolved
Resolves #1425
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.